/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.end2end;

import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PAGED_ROWS_COUNTER;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.iterate.ScanningResultPostDummyResultCaller;
import org.apache.phoenix.iterate.ScanningResultPostValidResultCaller;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.types.PDate;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;

/**
 * ServerPagingIT tests that include some region moves while performing rs#next.
 */
@Category(NeedsOwnMiniClusterTest.class)
public class ServerPagingWithRegionMovesIT extends ParallelStatsDisabledIT {

  private static final Logger LOGGER = LoggerFactory.getLogger(ServerPagingWithRegionMovesIT.class);

  private static boolean hasTestStarted = false;
  private static int countOfDummyResults = 0;
  private static int countOfValidResults = 0;
  private static final List<String> TABLE_NAMES = Collections.synchronizedList(new ArrayList<>());

  private static class TestScanningResultPostValidResultCaller
    extends ScanningResultPostValidResultCaller {

    @Override
    public void postValidRowProcess() {
      if (
        hasTestStarted && (countOfValidResults++ % 2) == 0
          && (countOfValidResults < 17 || countOfValidResults > 28 && countOfValidResults < 40)
      ) {
        LOGGER.info("Moving regions of tables {}. current count of valid results: {}", TABLE_NAMES,
          countOfValidResults);
        TABLE_NAMES.forEach(table -> {
          try {
            moveRegionsOfTable(table);
          } catch (Exception e) {
            LOGGER.error("Unable to move regions of table: {}", table);
          }
        });
      }
    }
  }

  private static class TestScanningResultPostDummyResultCaller
    extends ScanningResultPostDummyResultCaller {

    @Override
    public void postDummyProcess() {
      if (
        hasTestStarted && (countOfDummyResults++ % 3) == 0
          && (countOfDummyResults < 17 || countOfDummyResults > 28 && countOfDummyResults < 40)
      ) {
        LOGGER.info("Moving regions of tables {}. current count of dummy results: {}", TABLE_NAMES,
          countOfDummyResults);
        TABLE_NAMES.forEach(table -> {
          try {
            moveRegionsOfTable(table);
          } catch (IOException e) {
            throw new RuntimeException(e);
          }
        });
      }
    }
  }

  @BeforeClass
  public static synchronized void doSetup() throws Exception {
    Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
    props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(0));
    props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
    props.put(QueryServices.TESTS_MINI_CLUSTER_NUM_REGION_SERVERS, String.valueOf(2));
    props.put(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, String.valueOf(1));
    props.put(QueryServices.PHOENIX_POST_DUMMY_PROCESS,
      TestScanningResultPostDummyResultCaller.class.getName());
    props.put(QueryServices.PHOENIX_POST_VALID_PROCESS,
      TestScanningResultPostValidResultCaller.class.getName());
    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
  }

  @After
  public void tearDown() throws Exception {
    TABLE_NAMES.clear();
    hasTestStarted = false;
    countOfDummyResults = 0;
    countOfValidResults = 0;
  }

  private void assertServerPagingMetric(String tableName, ResultSet rs, boolean isPaged)
    throws SQLException {
    Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
    for (Map.Entry<String, Map<MetricType, Long>> entry : metrics.entrySet()) {
      assertEquals(String.format("Got %s", entry.getKey()), tableName, entry.getKey());
      Map<MetricType, Long> metricValues = entry.getValue();
      Long pagedRowsCntr = metricValues.get(MetricType.PAGED_ROWS_COUNTER);
      assertNotNull(pagedRowsCntr);
      if (isPaged) {
        assertTrue(String.format("Got %d", pagedRowsCntr), pagedRowsCntr > 0);
      } else {
        assertEquals(String.format("Got %d", pagedRowsCntr), 0, (long) pagedRowsCntr);
      }
    }
    assertTrue(GLOBAL_PAGED_ROWS_COUNTER.getMetric().getValue() > 0);
  }

  @Test
  public void testOrderByNonAggregation() throws Exception {
    hasTestStarted = true;
    final String tablename = generateUniqueName();
    final String tenantId = getOrganizationId();

    final Date D1 = DateUtil.parseDate("1970-01-01 00:58:00");
    final Date D2 = DateUtil.parseDate("1970-01-01 01:02:00");
    final Date D3 = DateUtil.parseDate("1970-01-01 01:30:00");
    final Date D4 = DateUtil.parseDate("1970-01-01 01:45:00");
    final Date D5 = DateUtil.parseDate("1970-01-01 02:00:00");
    final Date D6 = DateUtil.parseDate("1970-01-01 04:00:00");
    final String F1 = "A";
    final String F2 = "B";
    final String F3 = "C";
    final String R1 = "R1";
    final String R2 = "R2";
    byte[][] splits =
      new byte[][] { ByteUtil.concat(Bytes.toBytes(tenantId), PDate.INSTANCE.toBytes(D3)),
        ByteUtil.concat(Bytes.toBytes(tenantId), PDate.INSTANCE.toBytes(D5)), };
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);

    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
      String ddl = "create table " + tablename + "   (organization_id char(15) not null,"
        + "    date date not null," + "    feature char(1) not null,"
        + "    unique_users integer not null,\n" + "    transactions bigint,\n"
        + "    region varchar,\n"
        + "    CONSTRAINT pk PRIMARY KEY (organization_id, \"DATE\", feature, unique_users))";
      TABLE_NAMES.add(tablename);
      StringBuilder buf = new StringBuilder(ddl);
      if (splits != null) {
        buf.append(" SPLIT ON (");
        for (int i = 0; i < splits.length; i++) {
          buf.append("'").append(Bytes.toString(splits[i])).append("'").append(",");
        }
        buf.setCharAt(buf.length() - 1, ')');
      }
      ddl = buf.toString();
      conn.createStatement().execute(ddl);

      PreparedStatement stmt = conn.prepareStatement("upsert into " + tablename + " ("
        + "    ORGANIZATION_ID, " + "    \"DATE\", " + "    FEATURE, " + "    UNIQUE_USERS, "
        + "    TRANSACTIONS, " + "    REGION) " + "VALUES (?, ?, ?, ?, ?, ?)");
      stmt.setString(1, tenantId);
      stmt.setDate(2, D1);
      stmt.setString(3, F1);
      stmt.setInt(4, 10);
      stmt.setLong(5, 100L);
      stmt.setString(6, R2);
      stmt.execute();

      stmt.setString(1, tenantId);
      stmt.setDate(2, D2);
      stmt.setString(3, F1);
      stmt.setInt(4, 20);
      stmt.setLong(5, 200);
      stmt.setString(6, null);
      stmt.execute();

      stmt.setString(1, tenantId);
      stmt.setDate(2, D3);
      stmt.setString(3, F1);
      stmt.setInt(4, 30);
      stmt.setLong(5, 300);
      stmt.setString(6, R1);
      stmt.execute();

      stmt.setString(1, tenantId);
      stmt.setDate(2, D4);
      stmt.setString(3, F2);
      stmt.setInt(4, 40);
      stmt.setLong(5, 400);
      stmt.setString(6, R1);
      stmt.execute();

      stmt.setString(1, tenantId);
      stmt.setDate(2, D5);
      stmt.setString(3, F3);
      stmt.setInt(4, 50);
      stmt.setLong(5, 500);
      stmt.setString(6, R2);
      stmt.execute();

      stmt.setString(1, tenantId);
      stmt.setDate(2, D6);
      stmt.setString(3, F1);
      stmt.setInt(4, 60);
      stmt.setLong(5, 600);
      stmt.setString(6, null);
      stmt.execute();
      conn.commit();
    }

    String query = "SELECT \"DATE\", transactions t FROM " + tablename
      + " WHERE organization_id=? AND unique_users <= 30 ORDER BY t DESC LIMIT 2";
    try (Connection conn = DriverManager.getConnection(getUrl(), props);
      PreparedStatement statement = conn.prepareStatement(query)) {
      TestUtil.dumpTable(conn, TableName.valueOf(tablename));
      statement.setString(1, tenantId);
      try (ResultSet rs = statement.executeQuery()) {
        moveRegionsOfTable(tablename);
        assertTrue(rs.next());
        assertEquals(D3.getTime(), rs.getDate(1).getTime());
        moveRegionsOfTable(tablename);
        assertTrue(rs.next());
        moveRegionsOfTable(tablename);
        assertEquals(D2.getTime(), rs.getDate(1).getTime());
        assertFalse(rs.next());
        assertServerPagingMetric(tablename, rs, true);
      }
    }
  }

  private static void moveRegionsOfTable(String tableName) throws IOException {
    Admin admin = getUtility().getAdmin();
    List<ServerName> servers = new ArrayList<>(admin.getRegionServers());
    ServerName server1 = servers.get(0);
    ServerName server2 = servers.get(1);
    List<RegionInfo> regionsOnServer1;
    try {
      regionsOnServer1 = admin.getRegions(server1);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    List<RegionInfo> regionsOnServer2;
    try {
      regionsOnServer2 = admin.getRegions(server2);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    regionsOnServer1.forEach(regionInfo -> {
      if (regionInfo.getTable().equals(TableName.valueOf(tableName))) {
        try {
          admin.move(regionInfo.getEncodedNameAsBytes(), server2);
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
      }
    });
    regionsOnServer2.forEach(regionInfo -> {
      if (regionInfo.getTable().equals(TableName.valueOf(tableName))) {
        try {
          admin.move(regionInfo.getEncodedNameAsBytes(), server1);
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
      }
    });
  }

  private static void moveAllRegions() throws IOException {
    Admin admin = getUtility().getAdmin();
    List<ServerName> servers = new ArrayList<>(admin.getRegionServers());
    ServerName server1 = servers.get(0);
    ServerName server2 = servers.get(1);
    List<RegionInfo> regionsOnServer1;
    try {
      regionsOnServer1 = admin.getRegions(server1);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    List<RegionInfo> regionsOnServer2;
    try {
      regionsOnServer2 = admin.getRegions(server2);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    regionsOnServer1.forEach(regionInfo -> {
      try {
        admin.move(regionInfo.getEncodedNameAsBytes(), server2);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    });
    regionsOnServer2.forEach(regionInfo -> {
      try {
        admin.move(regionInfo.getEncodedNameAsBytes(), server1);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    });
  }

  @Test
  public void testLimitOffsetWithSplit() throws Exception {
    hasTestStarted = true;
    final String tablename = generateUniqueName();
    final String[] STRINGS = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n",
      "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
    String ddl =
      "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n"
        + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n" + "C2.v1 VARCHAR,\n"
        + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2)) " + "SPLIT ON ('e','i','o')";
    TABLE_NAMES.add(tablename);
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
      createTestTable(getUrl(), ddl);
      for (int i = 0; i < 26; i++) {
        conn.createStatement().execute("UPSERT INTO " + tablename + " values('" + STRINGS[i] + "',"
          + i + "," + (i + 1) + "," + (i + 2) + ",'" + STRINGS[25 - i] + "')");
      }
      conn.commit();
      int limit = 10;
      // Testing 0 as remaining offset after 4 rows in first region, 4 rows in second region
      int offset = 8;
      ResultSet rs;
      rs = conn.createStatement().executeQuery(
        "SELECT t_id from " + tablename + " order by t_id limit " + limit + " offset " + offset);
      int i = 0;
      while (i < limit) {
        if (i % 3 == 0) {
          moveRegionsOfTable(tablename);
        }
        assertTrue(rs.next());
        assertEquals("Expected string didn't match for i = " + i, STRINGS[offset + i],
          rs.getString(1));
        i++;
      }
      assertServerPagingMetric(tablename, rs, true);

      // Testing query with offset + filter
      int filterCond = 10;
      rs = conn.createStatement().executeQuery("SELECT t_id from " + tablename + " where k2 > "
        + filterCond + " order by t_id limit " + limit + " offset " + offset);
      i = 0;
      limit = 5;
      while (i < limit) {
        if (i % 4 == 0) {
          moveRegionsOfTable(tablename);
        }
        assertTrue(rs.next());
        assertEquals("Expected string didn't match for i = " + i, STRINGS[offset + filterCond + i],
          rs.getString(1));
        i++;
      }
      assertServerPagingMetric(tablename, rs, true);

      limit = 35;
      rs = conn.createStatement()
        .executeQuery("SELECT t_id from " + tablename + " union all SELECT t_id from " + tablename
          + " offset " + offset + " FETCH FIRST " + limit + " rows only");
      i = 0;
      while (i++ < STRINGS.length - offset) {
        if (i % 3 == 0) {
          moveRegionsOfTable(tablename);
        }
        assertTrue(rs.next());
        assertEquals(STRINGS[offset + i - 1], rs.getString(1));
      }
      i = 0;
      while (i++ < limit - STRINGS.length - offset) {
        if (i % 3 == 0) {
          moveRegionsOfTable(tablename);
        }
        assertTrue(rs.next());
        assertEquals(STRINGS[i - 1], rs.getString(1));
      }
      // no paging when serial offset
      assertServerPagingMetric(tablename, rs, true);
      limit = 1;
      offset = 1;
      rs = conn.createStatement().executeQuery(
        "SELECT k2 from " + tablename + " order by k2 desc limit " + limit + " offset " + offset);
      moveRegionsOfTable(tablename);
      assertTrue(rs.next());
      assertEquals(25, rs.getInt(1));
      assertFalse(rs.next());
      assertServerPagingMetric(tablename, rs, true);
    }
  }

  @Test
  public void testLimitOffsetWithoutSplit() throws Exception {
    hasTestStarted = true;
    final String tablename = generateUniqueName();
    final String[] STRINGS = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n",
      "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
    String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n"
      + "k1 INTEGER NOT NULL,\n" + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n"
      + "C2.v1 VARCHAR,\n" + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))";
    TABLE_NAMES.add(tablename);
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
      createTestTable(getUrl(), ddl);
      for (int i = 0; i < 26; i++) {
        conn.createStatement().execute("UPSERT INTO " + tablename + " values('" + STRINGS[i] + "',"
          + i + "," + (i + 1) + "," + (i + 2) + ",'" + STRINGS[25 - i] + "')");
      }
      conn.commit();
      int limit = 10;
      // Testing 0 as remaining offset after 4 rows in first region, 4 rows in second region
      int offset = 8;
      ResultSet rs;
      rs = conn.createStatement().executeQuery(
        "SELECT t_id from " + tablename + " order by t_id limit " + limit + " offset " + offset);
      int i = 0;
      while (i < limit) {
        if (i % 3 == 0) {
          moveRegionsOfTable(tablename);
        }
        assertTrue(rs.next());
        assertEquals("Expected string didn't match for i = " + i, STRINGS[offset + i],
          rs.getString(1));
        i++;
      }
      assertServerPagingMetric(tablename, rs, false);

      // Testing query with offset + filter
      int filterCond = 10;
      rs = conn.createStatement().executeQuery("SELECT t_id from " + tablename + " where k2 > "
        + filterCond + " order by t_id limit " + limit + " offset " + offset);
      i = 0;
      limit = 5;
      while (i < limit) {
        if (i % 4 == 0) {
          moveRegionsOfTable(tablename);
        }
        assertTrue(rs.next());
        assertEquals("Expected string didn't match for i = " + i, STRINGS[offset + filterCond + i],
          rs.getString(1));
        i++;
      }
      assertServerPagingMetric(tablename, rs, false);

      limit = 35;
      rs = conn.createStatement()
        .executeQuery("SELECT t_id from " + tablename + " union all SELECT t_id from " + tablename
          + " offset " + offset + " FETCH FIRST " + limit + " rows only");
      i = 0;
      while (i++ < STRINGS.length - offset) {
        if (i % 3 == 0) {
          moveRegionsOfTable(tablename);
        }
        assertTrue(rs.next());
        assertEquals(STRINGS[offset + i - 1], rs.getString(1));
      }
      i = 0;
      while (i++ < limit - STRINGS.length - offset) {
        if (i % 3 == 0) {
          moveRegionsOfTable(tablename);
        }
        assertTrue(rs.next());
        assertEquals(STRINGS[i - 1], rs.getString(1));
      }
      assertServerPagingMetric(tablename, rs, false);
      limit = 1;
      offset = 1;
      rs = conn.createStatement().executeQuery(
        "SELECT k2 from " + tablename + " order by k2 desc limit " + limit + " offset " + offset);
      moveRegionsOfTable(tablename);
      assertTrue(rs.next());
      assertEquals(25, rs.getInt(1));
      assertFalse(rs.next());
      assertServerPagingMetric(tablename, rs, true);
    }
  }

  @Test
  public void testLimitOffsetWithSplit2() throws Exception {
    hasTestStarted = true;
    final String tablename = generateUniqueName();
    final String[] STRINGS = { "a_xyz", "b_xyz", "c_xyz", "d_xyz", "e_xyz", "f_xyz", "g_xyz",
      "h_xyz", "i_xyz", "j_xyz", "k_xyz", "l_xyz", "m_xyz", "n_xyz", "o_xyz", "p_xyz", "q_xyz",
      "r_xyz", "s_xyz", "t_xyz", "u_xyz", "v_xyz", "w_xyz", "x_xyz", "y_xyz", "z_xyz" };
    String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR,\n" + "k1 INTEGER,\n"
      + "k2 INTEGER,\n" + "C3.k3 INTEGER,\n" + "C2.v1 VARCHAR,\n"
      + "CONSTRAINT pk PRIMARY KEY (t_id)) " + "SPLIT ON ('e123','i123','o123')";
    TABLE_NAMES.add(tablename);
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
      createTestTable(getUrl(), ddl);
      for (int i = 0; i < 26; i++) {
        conn.createStatement().execute("UPSERT INTO " + tablename + " values('" + STRINGS[i] + "',"
          + (i + 1) * 2 + "," + (i + 1) * 3 + "," + (i + 2) * 2 + ",'" + STRINGS[25 - i] + "')");
      }
      conn.commit();
      int limit = 10;
      // Testing 0 as remaining offset after 4 rows in first region, 4 rows in second region
      int offset = 8;
      ResultSet rs;
      rs = conn.createStatement().executeQuery(
        "SELECT t_id from " + tablename + " order by t_id limit " + limit + " offset " + offset);
      int i = 0;
      while (i < limit) {
        if (i % 3 == 0) {
          moveRegionsOfTable(tablename);
        }
        assertTrue(rs.next());
        assertEquals("Expected string didn't match for i = " + i, STRINGS[offset + i],
          rs.getString(1));
        i++;
      }
      assertServerPagingMetric(tablename, rs, true);

      // Testing query with offset + filter
      int filterCond = 30;
      rs = conn.createStatement().executeQuery("SELECT t_id from " + tablename + " where k2 > "
        + filterCond + " order by t_id limit " + limit + " offset " + offset);
      i = 0;
      limit = 5;
      while (i < limit) {
        if (i % 4 == 0) {
          moveRegionsOfTable(tablename);
        }
        assertTrue(rs.next());
        assertEquals("Expected string didn't match for i = " + i, STRINGS[offset + 10 + i],
          rs.getString(1));
        i++;
      }
      assertServerPagingMetric(tablename, rs, true);

      limit = 35;
      rs = conn.createStatement()
        .executeQuery("SELECT t_id from " + tablename + " union all SELECT t_id from " + tablename
          + " offset " + offset + " FETCH FIRST " + limit + " rows only");
      i = 0;
      while (i++ < STRINGS.length - offset) {
        if (i % 3 == 0) {
          moveRegionsOfTable(tablename);
        }
        assertTrue(rs.next());
        assertEquals(STRINGS[offset + i - 1], rs.getString(1));
      }
      i = 0;
      while (i++ < limit - STRINGS.length - offset) {
        if (i % 3 == 0) {
          moveRegionsOfTable(tablename);
        }
        assertTrue(rs.next());
        assertEquals(STRINGS[i - 1], rs.getString(1));
      }
      // no paging when serial offset
      assertServerPagingMetric(tablename, rs, true);
      limit = 1;
      offset = 1;
      rs = conn.createStatement().executeQuery(
        "SELECT k2 from " + tablename + " order by k2 desc limit " + limit + " offset " + offset);
      moveRegionsOfTable(tablename);
      assertTrue(rs.next());
      assertEquals(75, rs.getInt(1));
      assertFalse(rs.next());
      assertServerPagingMetric(tablename, rs, true);
    }
  }

  @Test
  public void testLimitOffsetWithoutSplit2() throws Exception {
    hasTestStarted = true;
    final String tablename = generateUniqueName();
    final String[] STRINGS = { "a_xyz", "b_xyz", "c_xyz", "d_xyz", "e_xyz", "f_xyz", "g_xyz",
      "h_xyz", "i_xyz", "j_xyz", "k_xyz", "l_xyz", "m_xyz", "n_xyz", "o_xyz", "p_xyz", "q_xyz",
      "r_xyz", "s_xyz", "t_xyz", "u_xyz", "v_xyz", "w_xyz", "x_xyz", "y_xyz", "z_xyz" };
    String ddl =
      "CREATE TABLE " + tablename + " (t_id VARCHAR,\n" + "k1 INTEGER,\n" + "k2 INTEGER,\n"
        + "C3.k3 INTEGER,\n" + "C2.v1 VARCHAR,\n" + "CONSTRAINT pk PRIMARY KEY (t_id))";
    TABLE_NAMES.add(tablename);
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
      createTestTable(getUrl(), ddl);
      for (int i = 0; i < 26; i++) {
        conn.createStatement().execute("UPSERT INTO " + tablename + " values('" + STRINGS[i] + "',"
          + (i + 1) * 2 + "," + (i + 1) * 3 + "," + (i + 2) * 2 + ",'" + STRINGS[25 - i] + "')");
      }
      conn.commit();
      int limit = 10;
      // Testing 0 as remaining offset after 4 rows in first region, 4 rows in second region
      int offset = 8;
      ResultSet rs;
      rs = conn.createStatement().executeQuery(
        "SELECT t_id from " + tablename + " order by t_id limit " + limit + " offset " + offset);
      int i = 0;
      while (i < limit) {
        if (i % 3 == 0) {
          moveRegionsOfTable(tablename);
        }
        assertTrue(rs.next());
        assertEquals("Expected string didn't match for i = " + i, STRINGS[offset + i],
          rs.getString(1));
        i++;
      }
      assertServerPagingMetric(tablename, rs, false);

      // Testing query with offset + filter
      int filterCond = 30;
      rs = conn.createStatement().executeQuery("SELECT t_id from " + tablename + " where k2 > "
        + filterCond + " order by t_id limit " + limit + " offset " + offset);
      i = 0;
      limit = 5;
      while (i < limit) {
        if (i % 4 == 0) {
          moveRegionsOfTable(tablename);
        }
        assertTrue(rs.next());
        assertEquals("Expected string didn't match for i = " + i, STRINGS[offset + 10 + i],
          rs.getString(1));
        i++;
      }
      assertServerPagingMetric(tablename, rs, false);

      limit = 35;
      rs = conn.createStatement()
        .executeQuery("SELECT t_id from " + tablename + " union all SELECT t_id from " + tablename
          + " offset " + offset + " FETCH FIRST " + limit + " rows only");
      i = 0;
      while (i++ < STRINGS.length - offset) {
        if (i % 3 == 0) {
          moveRegionsOfTable(tablename);
        }
        assertTrue(rs.next());
        assertEquals(STRINGS[offset + i - 1], rs.getString(1));
      }
      i = 0;
      while (i++ < limit - STRINGS.length - offset) {
        if (i % 3 == 0) {
          moveRegionsOfTable(tablename);
        }
        assertTrue(rs.next());
        assertEquals(STRINGS[i - 1], rs.getString(1));
      }
      // no paging when serial offset
      assertServerPagingMetric(tablename, rs, false);
      limit = 1;
      offset = 1;
      rs = conn.createStatement().executeQuery(
        "SELECT k2 from " + tablename + " order by k2 desc limit " + limit + " offset " + offset);
      moveRegionsOfTable(tablename);
      assertTrue(rs.next());
      assertEquals(75, rs.getInt(1));
      assertFalse(rs.next());
      assertServerPagingMetric(tablename, rs, true);
    }
  }

  @Test
  public void testGroupBy() throws Exception {
    hasTestStarted = true;
    final String tablename = generateUniqueName();
    TABLE_NAMES.add(tablename);
    String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n"
      + "k1 INTEGER NOT NULL,\n" + "k2 INTEGER CONSTRAINT pk PRIMARY KEY (t_id, k1)) ";
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
      createTestTable(getUrl(), ddl);
      for (int i = 0; i < 8; i++) {
        conn.createStatement()
          .execute("UPSERT INTO " + tablename + " values('tenant1'," + i + "," + (i + 1) + ")");
      }
      conn.createStatement().execute("UPSERT INTO " + tablename + " values('tenant1', 10, 2)");
      conn.createStatement().execute("UPSERT INTO " + tablename + " values('tenant1', 11, 2)");
      conn.createStatement().execute("UPSERT INTO " + tablename + " values('tenant1', 12, 3)");
      conn.createStatement().execute("UPSERT INTO " + tablename + " values('tenant1', 13, 3)");
      conn.createStatement().execute("UPSERT INTO " + tablename + " values('tenant1', 14, 4)");
      conn.createStatement().execute("UPSERT INTO " + tablename + " values('tenant1', 15, 4)");
      conn.createStatement().execute("UPSERT INTO " + tablename + " values('tenant1', 16, 4)");
      conn.createStatement().execute("UPSERT INTO " + tablename + " values('tenant1', 17, 5)");
      conn.createStatement().execute("UPSERT INTO " + tablename + " values('tenant1', 18, 5)");
      conn.commit();
      TestUtil.dumpTable(conn, TableName.valueOf(tablename));
      ResultSet rs = conn.createStatement().executeQuery(
        "SELECT k2, count(*) FROM " + tablename + " where t_id = 'tenant1' group by k2");
      moveRegionsOfTable(tablename);
      Assert.assertTrue(rs.next());
      Assert.assertEquals(1, rs.getInt(1));
      Assert.assertEquals(1, rs.getInt(2));
      Assert.assertTrue(rs.next());
      Assert.assertEquals(2, rs.getInt(1));
      Assert.assertEquals(3, rs.getInt(2));
      Assert.assertTrue(rs.next());
      Assert.assertEquals(3, rs.getInt(1));
      Assert.assertEquals(3, rs.getInt(2));
      Assert.assertTrue(rs.next());
      Assert.assertEquals(4, rs.getInt(1));
      Assert.assertEquals(4, rs.getInt(2));
      Assert.assertTrue(rs.next());
      Assert.assertEquals(5, rs.getInt(1));
      Assert.assertEquals(3, rs.getInt(2));
      Assert.assertTrue(rs.next());
      Assert.assertEquals(6, rs.getInt(1));
      Assert.assertEquals(1, rs.getInt(2));
      Assert.assertTrue(rs.next());
      Assert.assertEquals(7, rs.getInt(1));
      Assert.assertEquals(1, rs.getInt(2));
      Assert.assertTrue(rs.next());
      Assert.assertEquals(8, rs.getInt(1));
      Assert.assertEquals(1, rs.getInt(2));
      Assert.assertFalse(rs.next());
      assertServerPagingMetric(tablename, rs, true);
    }
  }

  @Test
  public void testGroupByWithIndex() throws Exception {
    hasTestStarted = true;
    final String tablename = generateUniqueName();
    final String indexName = generateUniqueName();
    TABLE_NAMES.add(tablename);
    TABLE_NAMES.add(indexName);

    String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n"
      + "k1 INTEGER NOT NULL,\n" + "k2 INTEGER CONSTRAINT pk PRIMARY KEY (t_id, k1)) ";
    String indexDDl = "CREATE INDEX IF NOT EXISTS " + indexName + " ON " + tablename + "(k2)";

    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
      createTestTable(getUrl(), ddl);
      createTestTable(getUrl(), indexDDl);
      for (int i = 0; i < 8; i++) {
        conn.createStatement()
          .execute("UPSERT INTO " + tablename + " values('tenant1'," + i + "," + (i + 1) + ")");
      }
      conn.commit();
      ResultSet rs = conn.createStatement().executeQuery("SELECT count(*) FROM " + tablename
        + " where t_id = 'tenant1' AND (k2 IN (5,6) or k2 is null) group by k2=6");
      boolean moveRegions = true;
      moveRegionsOfTable(tablename);
      moveRegionsOfTable(indexName);
      while (rs.next()) {
        if (moveRegions) {
          moveRegionsOfTable(tablename);
          moveRegionsOfTable(indexName);
          moveRegions = false;
        } else {
          moveRegions = true;
        }
        Assert.assertEquals(1, rs.getInt(1));
      }
      Assert.assertFalse(rs.next());
      assertServerPagingMetric(indexName, rs, true);
    }
  }

}
